{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## \\[RFC\\] How DataFrames (DF) and DataPipes (DP) work together"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "from importlib import reload\n",
    "import torch\n",
    "reload(torch)\n",
    "from torch.utils.data import IterDataPipe"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Example IterDataPipe\n",
    "class ExampleIterPipe(IterDataPipe):\n",
    "    def __init__(self, range = 20) -> None:\n",
    "        self.range = range\n",
    "    def __iter__(self):\n",
    "        yield from self.range\n",
    "\n",
    "def get_dataframes_pipe(range = 10, dataframe_size = 7):\n",
    "    return ExampleIterPipe(range = range).map(lambda i: (i, i % 3))._to_dataframes_pipe(columns = ['i','j'], dataframe_size = dataframe_size)\n",
    "\n",
    "def get_regular_pipe(range = 10):\n",
    "    return ExampleIterPipe(range = range).map(lambda i: (i, i % 3))\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Doesn't matter how DF composed internally, iterator over DF Pipe gives single rows to user. This is similar to regular DataPipe."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "DataFrames Pipe\n",
      "(0, 0)\n",
      "(1, 1)\n",
      "(2, 2)\n",
      "(3, 0)\n",
      "(4, 1)\n",
      "(5, 2)\n",
      "(6, 0)\n",
      "(7, 1)\n",
      "(8, 2)\n",
      "(9, 0)\n",
      "Regular DataPipe\n",
      "(0, 0)\n",
      "(1, 1)\n",
      "(2, 2)\n",
      "(3, 0)\n",
      "(4, 1)\n",
      "(5, 2)\n",
      "(6, 0)\n",
      "(7, 1)\n",
      "(8, 2)\n",
      "(9, 0)\n"
     ]
    }
   ],
   "source": [
    "print('DataFrames Pipe')\n",
    "dp = get_dataframes_pipe()\n",
    "for i in dp:\n",
    "    print(i)\n",
    "\n",
    "print('Regular DataPipe')\n",
    "dp = get_regular_pipe()\n",
    "for i in dp:\n",
    "    print(i)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You can iterate over raw DF using `raw_iterator`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "   i  j\n",
      "0  0  0\n",
      "1  1  1\n",
      "2  2  2\n",
      "3  3  0\n",
      "4  4  1\n",
      "5  5  2\n",
      "6  6  0\n",
      "   i  j\n",
      "0  7  1\n",
      "1  8  2\n",
      "2  9  0\n"
     ]
    }
   ],
   "source": [
    "dp = get_dataframes_pipe()\n",
    "for i in dp.raw_iterator():\n",
    "    print(i)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Operations over DF Pipe is captured"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "tags": []
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "var_3 = input_var_2.i * 100\n",
      "var_4 = var_3 + input_var_2.j\n",
      "var_5 = var_4 - 2.7\n",
      "input_var_2[\"y\"] = var_5\n"
     ]
    }
   ],
   "source": [
    "dp = get_dataframes_pipe(dataframe_size = 3)\n",
    "dp['y'] = dp.i * 100 + dp.j - 2.7\n",
    "print(dp.ops_str())\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Captured operations executed on `__next__` calls of constructed DataPipe"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "   i  j      y\n",
      "0  0  0   -2.7\n",
      "1  1  1   98.3\n",
      "2  2  2  199.3\n",
      "   i  j      y\n",
      "0  3  0  297.3\n",
      "1  4  1  398.3\n",
      "2  5  2  499.3\n",
      "   i  j      y\n",
      "0  6  0  597.3\n",
      "1  7  1  698.3\n",
      "2  8  2  799.3\n",
      "   i  j      y\n",
      "0  9  0  897.3\n"
     ]
    }
   ],
   "source": [
    "dp = get_dataframes_pipe(dataframe_size = 3)\n",
    "dp['y'] = dp.i * 100 + dp.j - 2.7\n",
    "for i in dp.raw_iterator():\n",
    "    print(i)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "`shuffle` of DataFramePipe effects rows in individual manner"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Raw DataFrames iterator\n",
      "   i  j\n",
      "2  8  2\n",
      "2  2  2\n",
      "2  5  2\n",
      "   i  j\n",
      "1  4  1\n",
      "1  1  1\n",
      "0  3  0\n",
      "   i  j\n",
      "1  7  1\n",
      "0  9  0\n",
      "0  6  0\n",
      "   i  j\n",
      "0  0  0\n",
      "Regular DataFrames iterator\n",
      "(1, 1)\n",
      "(5, 2)\n",
      "(8, 2)\n",
      "(9, 0)\n",
      "(7, 1)\n",
      "(6, 0)\n",
      "(3, 0)\n",
      "(4, 1)\n",
      "(0, 0)\n",
      "(2, 2)\n",
      "Regular iterator\n",
      "(5, 2)\n",
      "(6, 0)\n",
      "(0, 0)\n",
      "(9, 0)\n",
      "(3, 0)\n",
      "(1, 1)\n",
      "(2, 2)\n",
      "(8, 2)\n",
      "(4, 1)\n",
      "(7, 1)\n"
     ]
    }
   ],
   "source": [
    "dp = get_dataframes_pipe(dataframe_size = 3)\n",
    "dp = dp.shuffle()\n",
    "print('Raw DataFrames iterator')\n",
    "for i in dp.raw_iterator():\n",
    "    print(i)\n",
    "\n",
    "print('Regular DataFrames iterator')\n",
    "for i in dp:\n",
    "    print(i)\n",
    "\n",
    "\n",
    "# this is similar to shuffle of regular DataPipe\n",
    "dp = get_regular_pipe()\n",
    "dp = dp.shuffle()\n",
    "print('Regular iterator')\n",
    "for i in dp:\n",
    "    print(i)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You can continue mixing DF and DP operations"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "    i   j          y\n",
      "0 -17 -17  -197000.0\n",
      "1 -13 -16  3813000.0\n",
      "0 -11 -17  5803000.0\n",
      "    i   j          y\n",
      "2 -12 -15  4823000.0\n",
      "1 -10 -16  6813000.0\n",
      "1 -16 -16   813000.0\n",
      "    i   j          y\n",
      "0  -8 -17  8803000.0\n",
      "2  -9 -15  7823000.0\n",
      "0 -14 -17  2803000.0\n",
      "    i   j          y\n",
      "2 -15 -15  1823000.0\n"
     ]
    }
   ],
   "source": [
    "dp = get_dataframes_pipe(dataframe_size = 3)\n",
    "dp['y'] = dp.i * 100 + dp.j - 2.7\n",
    "dp = dp.shuffle()\n",
    "dp = dp - 17\n",
    "dp['y'] = dp.y * 10000\n",
    "for i in dp.raw_iterator():\n",
    "    print(i)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Batching combines everything into `list` it is possible to nest `list`s. List may have any number of DataFrames as soon as total number of rows equal to batch size."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Iterate over DataFrame batches\n",
      "[(6, 0),(0, 0)]\n",
      "[(4, 1),(1, 1)]\n",
      "[(2, 2),(9, 0)]\n",
      "[(3, 0),(5, 2)]\n",
      "[(7, 1),(8, 2)]\n",
      "Iterate over regular batches\n",
      "[(1, 1),(4, 1)]\n",
      "[(2, 2),(3, 0)]\n",
      "[(6, 0),(7, 1)]\n",
      "[(8, 2),(0, 0)]\n",
      "[(5, 2),(9, 0)]\n"
     ]
    }
   ],
   "source": [
    "dp = get_dataframes_pipe(dataframe_size = 3)\n",
    "dp = dp.shuffle()\n",
    "dp = dp.batch(2)\n",
    "print(\"Iterate over DataFrame batches\")\n",
    "for v in dp:\n",
    "    print(v)\n",
    "\n",
    "# this is similar to batching of regular DataPipe\n",
    "dp = get_regular_pipe()\n",
    "dp = dp.shuffle()\n",
    "dp = dp.batch(2)\n",
    "print(\"Iterate over regular batches\")\n",
    "for i in dp:\n",
    "    print(i)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Some details about internal storage of batched DataFrames and how they are iterated"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Type:  <class 'torch.utils.data.datapipes.iter.dataframes.DataChunkDF'>\n",
      "As string:  [(0, 0),(3, 0)]\n",
      "Iterated regularly:\n",
      "-- batch start --\n",
      "(0, 0)\n",
      "(3, 0)\n",
      "-- batch end --\n",
      "Iterated in inner format (for developers):\n",
      "-- df batch start --\n",
      "   i  j\n",
      "0  0  0\n",
      "0  3  0\n",
      "-- df batch end --\n",
      "Type:  <class 'torch.utils.data.datapipes.iter.dataframes.DataChunkDF'>\n",
      "As string:  [(6, 0),(1, 1)]\n",
      "Iterated regularly:\n",
      "-- batch start --\n",
      "(6, 0)\n",
      "(1, 1)\n",
      "-- batch end --\n",
      "Iterated in inner format (for developers):\n",
      "-- df batch start --\n",
      "   i  j\n",
      "0  6  0\n",
      "1  1  1\n",
      "-- df batch end --\n",
      "Type:  <class 'torch.utils.data.datapipes.iter.dataframes.DataChunkDF'>\n",
      "As string:  [(9, 0),(4, 1)]\n",
      "Iterated regularly:\n",
      "-- batch start --\n",
      "(9, 0)\n",
      "(4, 1)\n",
      "-- batch end --\n",
      "Iterated in inner format (for developers):\n",
      "-- df batch start --\n",
      "   i  j\n",
      "0  9  0\n",
      "1  4  1\n",
      "-- df batch end --\n",
      "Type:  <class 'torch.utils.data.datapipes.iter.dataframes.DataChunkDF'>\n",
      "As string:  [(5, 2),(2, 2)]\n",
      "Iterated regularly:\n",
      "-- batch start --\n",
      "(5, 2)\n",
      "(2, 2)\n",
      "-- batch end --\n",
      "Iterated in inner format (for developers):\n",
      "-- df batch start --\n",
      "   i  j\n",
      "2  5  2\n",
      "2  2  2\n",
      "-- df batch end --\n",
      "Type:  <class 'torch.utils.data.datapipes.iter.dataframes.DataChunkDF'>\n",
      "As string:  [(8, 2),(7, 1)]\n",
      "Iterated regularly:\n",
      "-- batch start --\n",
      "(8, 2)\n",
      "(7, 1)\n",
      "-- batch end --\n",
      "Iterated in inner format (for developers):\n",
      "-- df batch start --\n",
      "   i  j\n",
      "2  8  2\n",
      "1  7  1\n",
      "-- df batch end --\n"
     ]
    }
   ],
   "source": [
    "dp = get_dataframes_pipe(dataframe_size = 3)\n",
    "dp = dp.shuffle()\n",
    "dp = dp.batch(2)\n",
    "for i in dp:\n",
    "    print(\"Type: \", type(i))\n",
    "    print(\"As string: \", i)\n",
    "    print(\"Iterated regularly:\")\n",
    "    print('-- batch start --')\n",
    "    for item in i:\n",
    "        print(item)\n",
    "    print('-- batch end --')\n",
    "    print(\"Iterated in inner format (for developers):\")\n",
    "    print('-- df batch start --')\n",
    "    for item in i.raw_iterator():\n",
    "        print(item)\n",
    "    print('-- df batch end --')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "`concat` should work only of DF with same schema, this code should produce an error "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [],
   "source": [
    "# TODO!\n",
    "# dp0 = get_dataframes_pipe(range = 8, dataframe_size = 4)\n",
    "# dp = get_dataframes_pipe(range = 6, dataframe_size = 3)\n",
    "# dp['y'] = dp.i * 100 + dp.j - 2.7\n",
    "# dp = dp.concat(dp0)\n",
    "# for i,v in enumerate(dp.raw_iterator()):\n",
    "#     print(v)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "`unbatch` of `list` with DataFrame works similarly to regular unbatch.\n",
    "Note: DataFrame sizes might change"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "ename": "AttributeError",
     "evalue": "",
     "output_type": "error",
     "traceback": [
      "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
      "\u001b[0;31mAttributeError\u001b[0m                            Traceback (most recent call last)",
      "\u001b[0;32m<ipython-input-12-fa80e9c68655>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m\u001b[0m\n\u001b[1;32m      4\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m      5\u001b[0m \u001b[0;31m# Here is bug with unbatching which doesn't detect DF type.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 6\u001b[0;31m \u001b[0mdp\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;34m'z'\u001b[0m\u001b[0;34m]\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mdp\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0my\u001b[0m \u001b[0;34m-\u001b[0m \u001b[0;36m100\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m      7\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m      8\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mi\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mdp\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mraw_iterator\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m~/dataset/pytorch/torch/utils/data/dataset.py\u001b[0m in \u001b[0;36m__getattr__\u001b[0;34m(self, attribute_name)\u001b[0m\n\u001b[1;32m    222\u001b[0m             \u001b[0;32mreturn\u001b[0m \u001b[0mfunction\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    223\u001b[0m         \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 224\u001b[0;31m             \u001b[0;32mraise\u001b[0m \u001b[0mAttributeError\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    225\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    226\u001b[0m     \u001b[0;32mdef\u001b[0m \u001b[0m__reduce_ex__\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m*\u001b[0m\u001b[0margs\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;31mAttributeError\u001b[0m: "
     ]
    }
   ],
   "source": [
    "dp = get_dataframes_pipe(range = 18, dataframe_size = 3)\n",
    "dp['y'] = dp.i * 100 + dp.j - 2.7\n",
    "dp = dp.batch(5).batch(3).batch(1).unbatch(unbatch_level = 3)\n",
    "\n",
    "# Here is bug with unbatching which doesn't detect DF type.\n",
    "dp['z'] = dp.y - 100\n",
    "\n",
    "for i in dp.raw_iterator():\n",
    "    print(i)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "`map` applied to individual rows, `nesting_level` argument used to penetrate batching"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Iterate over DataFrame batches\n",
      "[(1111000, 1111000),(1112000, 1112000),(1113000, 1113000),(1114000, 1111000),(1115000, 1112000)]\n",
      "[(1116000, 1113000),(1117000, 1111000),(1118000, 1112000),(1119000, 1113000),(1120000, 1111000)]\n",
      "Iterate over regular batches\n",
      "[(1111000, 0),(1112000, 1),(1113000, 2),(1114000, 0),(1115000, 1)]\n",
      "[(1116000, 2),(1117000, 0),(1118000, 1),(1119000, 2),(1120000, 0)]\n"
     ]
    }
   ],
   "source": [
    "dp = get_dataframes_pipe(range = 10, dataframe_size = 3)\n",
    "dp = dp.map(lambda x: x + 1111)\n",
    "dp = dp.batch(5).map(lambda x: x * 1000, nesting_level = 1)\n",
    "print(\"Iterate over DataFrame batches\")\n",
    "for i in dp:\n",
    "    print(i)\n",
    "\n",
    "# Similarly works on row level for classic DataPipe elements\n",
    "dp = get_regular_pipe(range = 10)\n",
    "dp = dp.map(lambda x: (x[0] + 1111, x[1]))\n",
    "dp = dp.batch(5).map(lambda x: (x[0] * 1000, x[1]), nesting_level = 1)\n",
    "print(\"Iterate over regular batches\")\n",
    "for i in dp:\n",
    "    print(i)\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "`filter` applied to individual rows, `nesting_level` argument used to penetrate batching"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Iterate over DataFrame batches\n",
      "[(6, 0),(7, 1),(8, 2),(9, 0),(10, 1)]\n",
      "[(11, 2),(12, 0)]\n",
      "Iterate over regular batches\n",
      "[(6, 0),(7, 1),(8, 2),(9, 0),(10, 1)]\n",
      "[(11, 2),(12, 0)]\n"
     ]
    }
   ],
   "source": [
    "dp = get_dataframes_pipe(range = 30, dataframe_size = 3)\n",
    "dp = dp.filter(lambda x: x.i > 5)\n",
    "dp = dp.batch(5).filter(lambda x: x.i < 13, nesting_level = 1)\n",
    "print(\"Iterate over DataFrame batches\")\n",
    "for i in dp:\n",
    "    print(i)\n",
    "\n",
    "# Similarly works on row level for classic DataPipe elements\n",
    "dp = get_regular_pipe(range = 30)\n",
    "dp = dp.filter(lambda x: x[0] > 5)\n",
    "dp = dp.batch(5).filter(lambda x: x[0] < 13, nesting_level = 1)\n",
    "print(\"Iterate over regular batches\")\n",
    "for i in dp:\n",
    "    print(i)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3.6.10 64-bit ('dataloader': conda)",
   "name": "python3610jvsc74a57bd0eb5e09632d6ea1cbf3eb9da7e37b7cf581db5ed13074b21cc44e159dc62acdab"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.10"
  },
  "orig_nbformat": 2
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
